package mq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;

public class MQTest {

    public static void pullConsumer() {

    }

    public static void pushConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("first_rocketMQ_consumer_group_name");
        consumer.setNamesrvAddr("192.168.199.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe("first_topic", "weng");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    System.out.println(msgs.size());
                    for (MessageExt messageExt : msgs) {
                        System.out.println(System.currentTimeMillis()+":"+new String(messageExt.getBody()));
                        Thread.sleep(10);

                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

    public static void syncProducer(DefaultMQProducer producer) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {

        Message msg = new Message("first_topic", "weng", (Thread.currentThread().getName() + "Hello 翁剑平" + System.currentTimeMillis()).getBytes());
        SendResult sendResult = producer.send(msg);
        System.out.println(Thread.currentThread().getName() + ": " + sendResult);
        /**
         * 0: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811A4F0000, offsetMsgId=C0A8C78000002A9F0000000000083C16, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=3], queueOffset=0]
         * 1: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811AA80001, offsetMsgId=C0A8C78000002A9F0000000000083CCE, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=0], queueOffset=0]
         * 2: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811AC90002, offsetMsgId=C0A8C78000002A9F0000000000083D86, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=1], queueOffset=0]
         * 3: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811AD10003, offsetMsgId=C0A8C78000002A9F0000000000083E3E, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=2], queueOffset=0]
         * 4: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811AD70004, offsetMsgId=C0A8C78000002A9F0000000000083EF6, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=3], queueOffset=1]
         * 5: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811AE20005, offsetMsgId=C0A8C78000002A9F0000000000083FAE, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=0], queueOffset=1]
         * 6: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811AE40006, offsetMsgId=C0A8C78000002A9F0000000000084066, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=1], queueOffset=1]
         * 7: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811AE70007, offsetMsgId=C0A8C78000002A9F000000000008411E, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=2], queueOffset=1]
         * 8: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811AEB0008, offsetMsgId=C0A8C78000002A9F00000000000841D6, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=3], queueOffset=2]
         * 9: SendResult [sendStatus=SEND_OK, msgId=C0A8028C0F984629104A6B811AF60009, offsetMsgId=C0A8C78000002A9F000000000008428E, messageQueue=MessageQueue [topic=first_topic, brokerName=lw, queueId=0], queueOffset=2]
         */

    }


    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("first_rocketMQ_producer_group_name");
        producer.setNamesrvAddr("192.168.199.128:9876");
        try {
            producer.start();
            for (int i = 0; i < 1000; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {


                        try {
                            while (true) {
                                syncProducer(producer);
                            }
                        } catch (MQClientException e) {
                            e.printStackTrace();
                        } catch (RemotingException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (MQBrokerException e) {
                            e.printStackTrace();
                        }


                    }
                }).start();
            }
        } catch (MQClientException e) {
            e.printStackTrace();
        }


    }
}
